Hexagonal Architecture
# This is how most Python services start. It looks completely reasonable.
# services/document_service.py
import boto3
import sqlalchemy
from fastapi import UploadFile
from models import Document # SQLAlchemy ORM model
async def process_document(file: UploadFile, db: sqlalchemy.orm.Session) -> dict:
# Read from HTTP request (FastAPI dependency)
content = await file.read()
# Call an ML classifier (hardwired external call)
import requests
resp = requests.post("http://classifier-service/classify", data=content)
category = resp.json()["category"]
# Store in PostgreSQL via ORM (hardwired database)
doc = Document(filename=file.filename, content=content, category=category)
db.add(doc)
db.commit()
# Archive to S3 (hardwired cloud dependency)
s3 = boto3.client("s3")
s3.put_object(Bucket="documents", Key=file.filename, Body=content)
return {"id": doc.id, "category": category}
To test this function you need: a running FastAPI app, a PostgreSQL instance, a live classifier-service, and real AWS credentials. Changing the database means editing business logic. Moving from FastAPI to a CLI means rewriting this function. Swapping S3 for a local filesystem for development means adding if os.environ.get("ENV") == "dev" branches.
Hexagonal Architecture - also called Ports and Adapters - is the structural pattern that prevents this. Business logic sits in a technology-free core. All I/O contact happens through swappable adapters.
What You Will Learn
- Why classic layered architecture creates hard coupling to frameworks and databases
- How to draw the hexagon and understand which side is which
- How to write a domain core with zero non-stdlib imports
- How to write driving adapters (FastAPI, CLI, test harness) that call into the core
- How to write driven adapters (PostgreSQL, S3, in-memory) that the core calls out to
- How to wire everything together in a composition root
- How to achieve a three-tier testing strategy: no-I/O unit tests, adapter integration tests, and full end-to-end tests
Prerequisites
- Understand the Dependency Inversion Principle (Lesson 04)
- Understand Repositories and Domain Events from DDD (Lesson 05)
- Familiar with FastAPI, SQLAlchemy, and
typing.Protocol - Have felt the pain of a service that is hard to test
The running example throughout this lesson is a document processing service: the user uploads a PDF, the system extracts text, classifies the document, and stores the result.
Part 1 - The Problem with Layered Architecture
Classic Python web architecture looks like this:
HTTP Request
↓
views.py / routers.py
↓
services.py
↓
models.py (SQLAlchemy)
↓
Database
Every layer depends on the layer below. The problem is that services.py ends up importing from models.py directly, which means it imports SQLAlchemy. It also imports requests or boto3 directly, because there is no other place for them.
A Real Layered Violation
# services/document_processor.py - a typical layered service
from __future__ import annotations
import io
import logging
from typing import Optional
import boto3
import requests
from sqlalchemy.orm import Session
from models.document import DocumentORM # SQLAlchemy model
from models.classification import ClassificationORM
logger = logging.getLogger(__name__)
class DocumentProcessorService:
"""High-level business logic tangled with I/O dependencies."""
def __init__(self, db: Session) -> None:
self._db = db # SQLAlchemy session - hardwired
self._s3 = boto3.client("s3") # boto3 - hardwired
self._classifier_url = "http://classifier/" # external HTTP - hardwired
def process(self, filename: str, content: bytes, user_id: str) -> dict:
# Business rule: PDFs only
if not filename.endswith(".pdf"):
raise ValueError("Only PDF files are supported.")
# Business rule: max 50MB
if len(content) > 50 * 1024 * 1024:
raise ValueError("File too large. Maximum is 50 MB.")
# External call: classifier (hardwired HTTP)
resp = requests.post(
f"{self._classifier_url}classify",
data=content,
timeout=30,
)
resp.raise_for_status()
category = resp.json()["category"]
confidence = resp.json()["confidence"]
# External call: store in PostgreSQL (hardwired ORM)
doc = DocumentORM(
filename=filename,
user_id=user_id,
category=category,
confidence=confidence,
size_bytes=len(content),
)
self._db.add(doc)
self._db.flush()
doc_id = doc.id
# External call: archive to S3 (hardwired boto3)
self._s3.put_object(
Bucket="documents-archive",
Key=f"{user_id}/{doc_id}/{filename}",
Body=content,
)
self._db.commit()
logger.info(f"Processed document {doc_id}: {category} ({confidence:.2%})")
return {"id": doc_id, "category": category, "confidence": confidence}
What makes this untestable and inflexible?
| Problem | Consequence |
|---|---|
from models.document import DocumentORM | Cannot test without a database schema |
boto3.client("s3") in __init__ | Cannot test without AWS credentials |
requests.post(...) hardwired | Cannot test without a running classifier service |
Business rules (pdf only, 50MB) buried | Cannot verify rules without all I/O available |
| No way to swap S3 for local filesystem | Dev environment requires real AWS |
FastAPI-specific Session injection | Cannot use from a CLI or a test |
Part 2 - Ports and Adapters Model
Alistair Cockburn invented Hexagonal Architecture in 2005 with a simple insight: treat every external system - database, HTTP client, message queue, filesystem - symmetrically, and separate them all from the business logic via explicit interfaces called Ports.
┌───────────────────────────────┐
│ │
HTTP Request ────────┤ ┌──────────────────────┐ │
(Driving Adapter) │ │ │ │
│ │ Domain Core │ │
CLI Command ─────────┤ │ (the hexagon) ├────┤──── PostgreSQL
(Driving Adapter) │ │ │ │ (Driven Adapter)
│ │ Pure Python │ │
Test Harness ────────┤ │ No frameworks ├────┤──── S3
(Driving Adapter) │ │ No ORM imports │ │ (Driven Adapter)
│ │ No boto3 │ │
Message Queue ───────┤ │ ├────┤──── Classifier API
(Driving Adapter) │ └──────────────────────┘ │ (Driven Adapter)
│ │
│ ←── Ports (Protocols) ──────→│
└───────────────────────────────┘
Driving Side vs Driven Side
| Side | Also called | Initiates the call | Examples |
|---|---|---|---|
| Driving (left) | Primary, inbound | External world calls the core | FastAPI route, CLI command, test, cron job |
| Driven (right) | Secondary, outbound | Core calls external world | Database, S3, email, classifier API, message queue |
Ports
A Port is a Python Protocol (or ABC) that defines the interface between the core and the outside world. Ports live in the domain core. Concrete implementations (Adapters) live in the infrastructure layer.
# core/ports.py - the boundary. No framework imports here.
from __future__ import annotations
from typing import Protocol, Optional
from core.domain import Document, ClassificationResult
# ── Driven Ports (the core calls these) ──────────────────────────────────────
class DocumentStorage(Protocol):
"""Save and retrieve documents. Could be PostgreSQL, SQLite, or in-memory."""
def save(self, document: Document) -> None: ...
def get(self, document_id: str) -> Optional[Document]: ...
def find_by_user(self, user_id: str) -> list[Document]: ...
class FileArchive(Protocol):
"""Store raw file bytes. Could be S3, GCS, or local filesystem."""
def upload(self, key: str, content: bytes, content_type: str) -> str: ...
def download(self, key: str) -> bytes: ...
class TextExtractor(Protocol):
"""Extract plain text from raw bytes. Could be pdfminer, tika, or a mock."""
def extract(self, content: bytes, filename: str) -> str: ...
class DocumentClassifier(Protocol):
"""Classify a document. Could be an HTTP microservice, local model, or mock."""
def classify(self, text: str) -> ClassificationResult: ...
Part 3 - The Domain Core
The core is the hexagon. It contains your business logic, domain types, and port definitions. It imports nothing except the Python standard library and your own domain types.
Domain Types
# core/domain.py - zero non-stdlib imports
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Optional
import uuid
def _now() -> datetime:
return datetime.now(timezone.utc)
def _new_id() -> str:
return str(uuid.uuid4())
@dataclass(frozen=True)
class ClassificationResult:
category: str
confidence: float
def __post_init__(self) -> None:
if not 0.0 <= self.confidence <= 1.0:
raise ValueError(f"Confidence must be in [0.0, 1.0], got {self.confidence}")
if not self.category.strip():
raise ValueError("Category cannot be empty")
@dataclass
class Document:
"""
Domain entity. Has identity (document_id) and a lifecycle.
No SQLAlchemy, no FastAPI, no boto3 - pure Python.
"""
document_id: str = field(default_factory=_new_id)
filename: str = ""
user_id: str = ""
size_bytes: int = 0
category: str = ""
confidence: float = 0.0
archive_key: str = ""
status: str = "pending"
created_at: datetime = field(default_factory=_now)
processed_at: Optional[datetime] = None
def mark_processed(
self,
classification: ClassificationResult,
archive_key: str,
) -> None:
if self.status != "pending":
raise ValueError(f"Cannot process a {self.status!r} document.")
self.category = classification.category
self.confidence = classification.confidence
self.archive_key = archive_key
self.status = "processed"
self.processed_at = _now()
def mark_failed(self, reason: str) -> None:
self.status = "failed"
@property
def is_processed(self) -> bool:
return self.status == "processed"
Core Business Logic (the Use Case)
# core/use_cases.py - the heart of the application
from __future__ import annotations
import logging
import os
from core.domain import Document, ClassificationResult
from core.ports import DocumentStorage, FileArchive, TextExtractor, DocumentClassifier
logger = logging.getLogger(__name__)
_MAX_FILE_BYTES = 50 * 1024 * 1024 # 50 MB
_ALLOWED_EXTENSIONS = {".pdf", ".docx", ".txt"}
class DocumentProcessingError(Exception):
pass
class ProcessDocumentUseCase:
"""
Orchestrates: validate → extract text → classify → archive → persist.
Imports: NOTHING outside core.domain and core.ports.
All I/O happens through injected ports.
This class is 100% testable with no external services running.
"""
def __init__(
self,
storage: DocumentStorage,
archive: FileArchive,
extractor: TextExtractor,
classifier: DocumentClassifier,
) -> None:
self._storage = storage
self._archive = archive
self._extractor = extractor
self._classifier = classifier
def execute(
self,
filename: str,
content: bytes,
user_id: str,
content_type: str = "application/pdf",
) -> Document:
# ── Business rules (testable without any I/O) ─────────────────────────
ext = os.path.splitext(filename)[1].lower()
if ext not in _ALLOWED_EXTENSIONS:
raise DocumentProcessingError(
f"Unsupported file type: {ext!r}. "
f"Allowed: {', '.join(sorted(_ALLOWED_EXTENSIONS))}"
)
if len(content) == 0:
raise DocumentProcessingError("File content cannot be empty.")
if len(content) > _MAX_FILE_BYTES:
raise DocumentProcessingError(
f"File too large: {len(content):,} bytes. "
f"Maximum allowed: {_MAX_FILE_BYTES:,} bytes."
)
# ── Create domain object ──────────────────────────────────────────────
doc = Document(
filename=filename,
user_id=user_id,
size_bytes=len(content),
)
try:
# ── Driven port: extract text ─────────────────────────────────────
text = self._extractor.extract(content, filename)
if not text.strip():
raise DocumentProcessingError("Could not extract any text from the file.")
# ── Driven port: classify ─────────────────────────────────────────
result: ClassificationResult = self._classifier.classify(text)
# ── Driven port: archive raw file ─────────────────────────────────
archive_key = f"{user_id}/{doc.document_id}/{filename}"
self._archive.upload(archive_key, content, content_type)
# ── Update domain object ──────────────────────────────────────────
doc.mark_processed(result, archive_key)
except DocumentProcessingError:
doc.mark_failed("Processing error")
self._storage.save(doc)
raise
except Exception as exc:
doc.mark_failed(str(exc))
self._storage.save(doc)
raise DocumentProcessingError(f"Unexpected error: {exc}") from exc
# ── Driven port: persist ──────────────────────────────────────────────
self._storage.save(doc)
logger.info(
"Document processed",
extra={
"document_id": doc.document_id,
"category": doc.category,
"confidence": doc.confidence,
"user_id": user_id,
},
)
return doc
class GetDocumentUseCase:
def __init__(self, storage: DocumentStorage) -> None:
self._storage = storage
def execute(self, document_id: str, requesting_user_id: str) -> Document:
doc = self._storage.get(document_id)
if doc is None:
raise DocumentProcessingError(f"Document {document_id!r} not found.")
if doc.user_id != requesting_user_id:
raise PermissionError("You do not have access to this document.")
return doc
Notice what ProcessDocumentUseCase does not import: no boto3, no sqlalchemy, no fastapi, no requests. It is a plain Python class with plain Python types. Every decision it makes can be tested by instantiating it with fake ports.
Part 4 - Driving Adapters
A driving adapter receives input from the outside world and translates it into a call on the core's use cases. It knows about the framework - FastAPI, CLI, message queue - but the core knows nothing about the driver.
FastAPI Adapter
# adapters/driving/fastapi_router.py
from __future__ import annotations
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from core.use_cases import ProcessDocumentUseCase, GetDocumentUseCase, DocumentProcessingError
from adapters.driving.dependencies import get_process_use_case, get_document_use_case
router = APIRouter(prefix="/documents", tags=["documents"])
class DocumentResponse(BaseModel):
document_id: str
filename: str
category: str
confidence: float
status: str
archive_key: str
@router.post("/", response_model=DocumentResponse, status_code=201)
async def upload_document(
file: UploadFile = File(...),
user_id: str = "anonymous", # in production: extracted from JWT
use_case: ProcessDocumentUseCase = Depends(get_process_use_case),
) -> JSONResponse:
content = await file.read()
try:
doc = use_case.execute(
filename=file.filename or "upload",
content=content,
user_id=user_id,
content_type=file.content_type or "application/octet-stream",
)
except DocumentProcessingError as e:
raise HTTPException(status_code=422, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e))
return JSONResponse(
status_code=201,
content={
"document_id": doc.document_id,
"filename": doc.filename,
"category": doc.category,
"confidence": doc.confidence,
"status": doc.status,
"archive_key": doc.archive_key,
},
)
@router.get("/{document_id}", response_model=DocumentResponse)
def get_document(
document_id: str,
user_id: str = "anonymous",
use_case: GetDocumentUseCase = Depends(get_document_use_case),
) -> JSONResponse:
try:
doc = use_case.execute(document_id, user_id)
except DocumentProcessingError as e:
raise HTTPException(status_code=404, detail=str(e))
except PermissionError as e:
raise HTTPException(status_code=403, detail=str(e))
return JSONResponse(content={
"document_id": doc.document_id,
"filename": doc.filename,
"category": doc.category,
"confidence": doc.confidence,
"status": doc.status,
"archive_key": doc.archive_key,
})
# adapters/driving/dependencies.py
# FastAPI dependency injection - wires adapters to use cases
from functools import lru_cache
from core.use_cases import ProcessDocumentUseCase, GetDocumentUseCase
from adapters.driven.postgres_storage import PostgresDocumentStorage
from adapters.driven.s3_archive import S3FileArchive
from adapters.driven.pdfminer_extractor import PdfMinerTextExtractor
from adapters.driven.http_classifier import HttpDocumentClassifier
import os
@lru_cache
def _get_storage() -> PostgresDocumentStorage:
return PostgresDocumentStorage(dsn=os.environ["DATABASE_URL"])
@lru_cache
def _get_archive() -> S3FileArchive:
return S3FileArchive(bucket=os.environ["S3_BUCKET"])
@lru_cache
def _get_extractor() -> PdfMinerTextExtractor:
return PdfMinerTextExtractor()
@lru_cache
def _get_classifier() -> HttpDocumentClassifier:
return HttpDocumentClassifier(base_url=os.environ["CLASSIFIER_URL"])
def get_process_use_case() -> ProcessDocumentUseCase:
return ProcessDocumentUseCase(
storage=_get_storage(),
archive=_get_archive(),
extractor=_get_extractor(),
classifier=_get_classifier(),
)
def get_document_use_case() -> GetDocumentUseCase:
return GetDocumentUseCase(storage=_get_storage())
CLI Adapter (for the Same Use Case)
# adapters/driving/cli.py
# The exact same use case, driven by a command-line interface.
# No FastAPI, no HTTP - but the core is unchanged.
from __future__ import annotations
import argparse
import sys
from pathlib import Path
from core.use_cases import ProcessDocumentUseCase, DocumentProcessingError
from adapters.driven.postgres_storage import PostgresDocumentStorage
from adapters.driven.s3_archive import S3FileArchive
from adapters.driven.pdfminer_extractor import PdfMinerTextExtractor
from adapters.driven.http_classifier import HttpDocumentClassifier
import os
def build_use_case() -> ProcessDocumentUseCase:
return ProcessDocumentUseCase(
storage=PostgresDocumentStorage(dsn=os.environ["DATABASE_URL"]),
archive=S3FileArchive(bucket=os.environ["S3_BUCKET"]),
extractor=PdfMinerTextExtractor(),
classifier=HttpDocumentClassifier(base_url=os.environ["CLASSIFIER_URL"]),
)
def main() -> None:
parser = argparse.ArgumentParser(description="Process a document")
parser.add_argument("file", type=Path, help="Path to the PDF file")
parser.add_argument("--user", required=True, help="User ID")
args = parser.parse_args()
path: Path = args.file
if not path.exists():
print(f"Error: file not found: {path}", file=sys.stderr)
sys.exit(1)
content = path.read_bytes()
use_case = build_use_case()
try:
doc = use_case.execute(
filename=path.name,
content=content,
user_id=args.user,
)
print(f"Processed: {doc.document_id}")
print(f"Category: {doc.category} ({doc.confidence:.1%})")
except DocumentProcessingError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
The core ProcessDocumentUseCase is called from both the HTTP handler and the CLI. Neither has any idea the other exists. The business logic is in exactly one place.
Part 5 - Driven Adapters
Driven adapters implement the Ports defined in the core. They are responsible for all I/O. The core never imports them - they are injected at the composition root.
PostgreSQL Storage Adapter
# adapters/driven/postgres_storage.py
from __future__ import annotations
from typing import Optional
from datetime import datetime
import json
import psycopg2
import psycopg2.extras
from core.domain import Document
from core.ports import DocumentStorage
class PostgresDocumentStorage:
"""Implements DocumentStorage Port using PostgreSQL via psycopg2."""
DDL = """
CREATE TABLE IF NOT EXISTS documents (
document_id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
user_id TEXT NOT NULL,
size_bytes INTEGER NOT NULL,
category TEXT NOT NULL DEFAULT '',
confidence REAL NOT NULL DEFAULT 0.0,
archive_key TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'pending',
created_at TIMESTAMPTZ NOT NULL,
processed_at TIMESTAMPTZ
)
"""
def __init__(self, dsn: str) -> None:
self._dsn = dsn
self._ensure_schema()
def _connect(self):
return psycopg2.connect(self._dsn, cursor_factory=psycopg2.extras.RealDictCursor)
def _ensure_schema(self) -> None:
with self._connect() as conn, conn.cursor() as cur:
cur.execute(self.DDL)
def save(self, document: Document) -> None:
sql = """
INSERT INTO documents
(document_id, filename, user_id, size_bytes, category,
confidence, archive_key, status, created_at, processed_at)
VALUES
(%(document_id)s, %(filename)s, %(user_id)s, %(size_bytes)s,
%(category)s, %(confidence)s, %(archive_key)s, %(status)s,
%(created_at)s, %(processed_at)s)
ON CONFLICT (document_id) DO UPDATE SET
category = EXCLUDED.category,
confidence = EXCLUDED.confidence,
archive_key = EXCLUDED.archive_key,
status = EXCLUDED.status,
processed_at = EXCLUDED.processed_at
"""
with self._connect() as conn, conn.cursor() as cur:
cur.execute(sql, {
"document_id": document.document_id,
"filename": document.filename,
"user_id": document.user_id,
"size_bytes": document.size_bytes,
"category": document.category,
"confidence": document.confidence,
"archive_key": document.archive_key,
"status": document.status,
"created_at": document.created_at,
"processed_at": document.processed_at,
})
def get(self, document_id: str) -> Optional[Document]:
sql = "SELECT * FROM documents WHERE document_id = %s"
with self._connect() as conn, conn.cursor() as cur:
cur.execute(sql, (document_id,))
row = cur.fetchone()
if row is None:
return None
return self._row_to_domain(dict(row))
def find_by_user(self, user_id: str) -> list[Document]:
sql = "SELECT * FROM documents WHERE user_id = %s ORDER BY created_at DESC"
with self._connect() as conn, conn.cursor() as cur:
cur.execute(sql, (user_id,))
rows = cur.fetchall()
return [self._row_to_domain(dict(r)) for r in rows]
@staticmethod
def _row_to_domain(row: dict) -> Document:
doc = Document(
document_id=row["document_id"],
filename=row["filename"],
user_id=row["user_id"],
size_bytes=row["size_bytes"],
category=row["category"],
confidence=row["confidence"],
archive_key=row["archive_key"],
status=row["status"],
created_at=row["created_at"],
processed_at=row["processed_at"],
)
return doc
S3 File Archive Adapter
# adapters/driven/s3_archive.py
from __future__ import annotations
import boto3
from botocore.exceptions import ClientError
from core.ports import FileArchive
class S3FileArchive:
"""Implements FileArchive Port using Amazon S3."""
def __init__(self, bucket: str, region: str = "us-east-1") -> None:
self._bucket = bucket
self._client = boto3.client("s3", region_name=region)
def upload(self, key: str, content: bytes, content_type: str) -> str:
self._client.put_object(
Bucket=self._bucket,
Key=key,
Body=content,
ContentType=content_type,
)
return f"s3://{self._bucket}/{key}"
def download(self, key: str) -> bytes:
try:
response = self._client.get_object(Bucket=self._bucket, Key=key)
return response["Body"].read()
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
raise FileNotFoundError(f"Key not found: {key!r}")
raise
HTTP Classifier Adapter
# adapters/driven/http_classifier.py
from __future__ import annotations
import requests
from core.domain import ClassificationResult
from core.ports import DocumentClassifier
class HttpDocumentClassifier:
"""Implements DocumentClassifier Port via HTTP microservice."""
def __init__(self, base_url: str, timeout: int = 30) -> None:
self._base_url = base_url.rstrip("/")
self._timeout = timeout
def classify(self, text: str) -> ClassificationResult:
response = requests.post(
f"{self._base_url}/classify",
json={"text": text},
timeout=self._timeout,
)
response.raise_for_status()
data = response.json()
return ClassificationResult(
category=data["category"],
confidence=float(data["confidence"]),
)
class LocalMLClassifier:
"""Implements DocumentClassifier Port using a local model - no HTTP."""
def __init__(self, model_path: str) -> None:
# Load the model at startup, not per-call
import pickle
with open(model_path, "rb") as f:
self._model = pickle.load(f)
def classify(self, text: str) -> ClassificationResult:
prediction = self._model.predict([text])[0]
probability = self._model.predict_proba([text]).max()
return ClassificationResult(category=str(prediction), confidence=float(probability))
pdfminer Text Extractor Adapter
# adapters/driven/pdfminer_extractor.py
from __future__ import annotations
import io
from core.ports import TextExtractor
class PdfMinerTextExtractor:
"""Implements TextExtractor Port using pdfminer.six."""
def extract(self, content: bytes, filename: str) -> str:
if filename.lower().endswith(".txt"):
return content.decode("utf-8", errors="replace")
from pdfminer.high_level import extract_text_to_fp
from pdfminer.layout import LAParams
output = io.StringIO()
extract_text_to_fp(
io.BytesIO(content),
output,
laparams=LAParams(),
)
return output.getvalue()
In-Memory Adapters (for Tests)
These are driven adapters that satisfy all the Port protocols with no external services.
# tests/fakes.py
from __future__ import annotations
from typing import Optional
from core.domain import Document, ClassificationResult
from core.ports import DocumentStorage, FileArchive, TextExtractor, DocumentClassifier
class InMemoryDocumentStorage:
def __init__(self) -> None:
self._store: dict[str, Document] = {}
def save(self, document: Document) -> None:
self._store[document.document_id] = document
def get(self, document_id: str) -> Optional[Document]:
return self._store.get(document_id)
def find_by_user(self, user_id: str) -> list[Document]:
return [d for d in self._store.values() if d.user_id == user_id]
def all(self) -> list[Document]:
return list(self._store.values())
class InMemoryFileArchive:
def __init__(self) -> None:
self._blobs: dict[str, bytes] = {}
def upload(self, key: str, content: bytes, content_type: str) -> str:
self._blobs[key] = content
return f"memory://{key}"
def download(self, key: str) -> bytes:
if key not in self._blobs:
raise FileNotFoundError(f"Key not found: {key!r}")
return self._blobs[key]
def contains(self, key: str) -> bool:
return key in self._blobs
class StubTextExtractor:
"""Returns a fixed text string - controllable from tests."""
def __init__(self, text: str = "Stub extracted text for testing purposes.") -> None:
self._text = text
def extract(self, content: bytes, filename: str) -> str:
return self._text
class StubDocumentClassifier:
"""Returns a fixed classification - controllable from tests."""
def __init__(
self,
category: str = "contract",
confidence: float = 0.95,
) -> None:
self._result = ClassificationResult(category=category, confidence=confidence)
def classify(self, text: str) -> ClassificationResult:
return self._result
class FailingClassifier:
"""Always fails - for testing error paths."""
def classify(self, text: str) -> ClassificationResult:
raise RuntimeError("Classifier service unavailable")
Part 6 - Wiring It Together
The composition root is the one place that knows about all the concrete classes and connects them. Ideally this is a single file - main.py or app.py.
# main.py - the composition root for a FastAPI application
from __future__ import annotations
import os
import logging
import logging.config
from fastapi import FastAPI
from adapters.driving.fastapi_router import router as document_router
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
)
def create_app() -> FastAPI:
app = FastAPI(
title="Document Processing Service",
description="Upload, classify, and archive documents.",
version="1.0.0",
)
app.include_router(document_router)
return app
app = create_app()
# main_test.py - composition root for tests
# Swap every driven adapter for an in-memory fake.
# Zero external services required.
from __future__ import annotations
from fastapi import FastAPI
from fastapi.testclient import TestClient
from adapters.driving.fastapi_router import router
from core.use_cases import ProcessDocumentUseCase, GetDocumentUseCase
from tests.fakes import (
InMemoryDocumentStorage,
InMemoryFileArchive,
StubTextExtractor,
StubDocumentClassifier,
)
# Build the app with fake adapters
_storage = InMemoryDocumentStorage()
_archive = InMemoryFileArchive()
_extractor = StubTextExtractor("Contract between parties A and B.")
_classifier = StubDocumentClassifier("contract", 0.97)
_process_uc = ProcessDocumentUseCase(
storage=_storage,
archive=_archive,
extractor=_extractor,
classifier=_classifier,
)
_get_uc = GetDocumentUseCase(storage=_storage)
# Override FastAPI dependencies
def _override_process() -> ProcessDocumentUseCase:
return _process_uc
def _override_get() -> GetDocumentUseCase:
return _get_uc
from adapters.driving.dependencies import get_process_use_case, get_document_use_case
app = FastAPI()
app.include_router(router)
app.dependency_overrides[get_process_use_case] = _override_process
app.dependency_overrides[get_document_use_case] = _override_get
client = TestClient(app)
Swapping Adapters at Runtime
# config.py - profile-based adapter selection
import os
from core.ports import DocumentStorage, FileArchive, TextExtractor, DocumentClassifier
from core.use_cases import ProcessDocumentUseCase
def build_use_case() -> ProcessDocumentUseCase:
profile = os.environ.get("PROFILE", "production")
if profile == "test":
from tests.fakes import (
InMemoryDocumentStorage, InMemoryFileArchive,
StubTextExtractor, StubDocumentClassifier,
)
return ProcessDocumentUseCase(
storage=InMemoryDocumentStorage(),
archive=InMemoryFileArchive(),
extractor=StubTextExtractor(),
classifier=StubDocumentClassifier(),
)
if profile == "local":
from adapters.driven.local_storage import LocalFileStorage
from adapters.driven.pdfminer_extractor import PdfMinerTextExtractor
from adapters.driven.http_classifier import HttpDocumentClassifier
from tests.fakes import InMemoryDocumentStorage # still fake DB for local dev
return ProcessDocumentUseCase(
storage=InMemoryDocumentStorage(),
archive=LocalFileStorage(base_dir="/tmp/doc-archive"),
extractor=PdfMinerTextExtractor(),
classifier=HttpDocumentClassifier("http://localhost:8001"),
)
# production
from adapters.driven.postgres_storage import PostgresDocumentStorage
from adapters.driven.s3_archive import S3FileArchive
from adapters.driven.pdfminer_extractor import PdfMinerTextExtractor
from adapters.driven.http_classifier import HttpDocumentClassifier
return ProcessDocumentUseCase(
storage=PostgresDocumentStorage(dsn=os.environ["DATABASE_URL"]),
archive=S3FileArchive(bucket=os.environ["S3_BUCKET"]),
extractor=PdfMinerTextExtractor(),
classifier=HttpDocumentClassifier(os.environ["CLASSIFIER_URL"]),
)
Part 7 - Testing Strategy
Hexagonal Architecture enables a three-tier testing pyramid where the most granular tests are also the fastest and most numerous.
Tier 1 - Unit Tests (Domain Core, No I/O)
These tests run in milliseconds. They test every business rule using in-memory fakes.
# tests/unit/test_process_document.py
import pytest
from core.use_cases import ProcessDocumentUseCase, DocumentProcessingError
from core.domain import Document
from tests.fakes import (
InMemoryDocumentStorage,
InMemoryFileArchive,
StubTextExtractor,
StubDocumentClassifier,
FailingClassifier,
)
PDF_CONTENT = b"%PDF-1.4 fake pdf content"
TXT_CONTENT = b"Plain text content for testing."
def _make_use_case(
extractor_text: str = "Valid document text.",
category: str = "contract",
confidence: float = 0.92,
):
storage = InMemoryDocumentStorage()
archive = InMemoryFileArchive()
extractor = StubTextExtractor(extractor_text)
classifier = StubDocumentClassifier(category, confidence)
uc = ProcessDocumentUseCase(storage, archive, extractor, classifier)
return uc, storage, archive
class TestValidation:
def test_rejects_unsupported_extension(self):
uc, _, _ = _make_use_case()
with pytest.raises(DocumentProcessingError, match="Unsupported file type"):
uc.execute("report.xlsx", PDF_CONTENT, "user-1")
def test_rejects_empty_content(self):
uc, _, _ = _make_use_case()
with pytest.raises(DocumentProcessingError, match="empty"):
uc.execute("report.pdf", b"", "user-1")
def test_rejects_oversized_file(self):
uc, _, _ = _make_use_case()
huge = b"x" * (51 * 1024 * 1024) # 51 MB
with pytest.raises(DocumentProcessingError, match="too large"):
uc.execute("report.pdf", huge, "user-1")
def test_rejects_blank_extracted_text(self):
storage = InMemoryDocumentStorage()
archive = InMemoryFileArchive()
extractor = StubTextExtractor("") # blank output
classifier = StubDocumentClassifier()
uc = ProcessDocumentUseCase(storage, archive, extractor, classifier)
with pytest.raises(DocumentProcessingError, match="extract"):
uc.execute("blank.pdf", PDF_CONTENT, "user-1")
class TestSuccessPath:
def test_document_is_classified(self):
uc, storage, _ = _make_use_case(category="invoice", confidence=0.88)
doc = uc.execute("inv.pdf", PDF_CONTENT, "user-1")
assert doc.category == "invoice"
assert abs(doc.confidence - 0.88) < 1e-6
assert doc.status == "processed"
def test_document_is_persisted(self):
uc, storage, _ = _make_use_case()
doc = uc.execute("report.pdf", PDF_CONTENT, "user-42")
saved = storage.get(doc.document_id)
assert saved is not None
assert saved.user_id == "user-42"
def test_file_is_archived(self):
uc, _, archive = _make_use_case()
doc = uc.execute("contract.pdf", PDF_CONTENT, "user-1")
assert archive.contains(doc.archive_key.replace("memory://", ""))
def test_archive_key_includes_user_and_document_id(self):
uc, storage, archive = _make_use_case()
doc = uc.execute("my_doc.pdf", PDF_CONTENT, "user-99")
assert "user-99" in doc.archive_key
assert doc.document_id in doc.archive_key
assert "my_doc.pdf" in doc.archive_key
def test_txt_files_accepted(self):
uc, _, _ = _make_use_case()
doc = uc.execute("notes.txt", TXT_CONTENT, "user-1")
assert doc.status == "processed"
class TestErrorPaths:
def test_classifier_failure_marks_document_failed(self):
storage = InMemoryDocumentStorage()
archive = InMemoryFileArchive()
extractor = StubTextExtractor("Some text")
classifier = FailingClassifier()
uc = ProcessDocumentUseCase(storage, archive, extractor, classifier)
with pytest.raises(DocumentProcessingError):
uc.execute("report.pdf", PDF_CONTENT, "user-1")
# Document should be persisted in failed state
docs = storage.all()
assert len(docs) == 1
assert docs[0].status == "failed"
class TestGetDocument:
def test_returns_own_document(self):
uc, storage, _ = _make_use_case()
doc = uc.execute("report.pdf", PDF_CONTENT, "user-1")
from core.use_cases import GetDocumentUseCase
get_uc = GetDocumentUseCase(storage)
retrieved = get_uc.execute(doc.document_id, "user-1")
assert retrieved.document_id == doc.document_id
def test_cannot_access_other_users_document(self):
uc, storage, _ = _make_use_case()
doc = uc.execute("report.pdf", PDF_CONTENT, "user-1")
from core.use_cases import GetDocumentUseCase
get_uc = GetDocumentUseCase(storage)
with pytest.raises(PermissionError):
get_uc.execute(doc.document_id, "user-2") # wrong user
def test_not_found_raises_error(self):
from core.use_cases import GetDocumentUseCase, DocumentProcessingError
storage = InMemoryDocumentStorage()
get_uc = GetDocumentUseCase(storage)
with pytest.raises(DocumentProcessingError, match="not found"):
get_uc.execute("nonexistent-id", "user-1")
Tier 2 - Adapter Integration Tests (Real Infrastructure, Isolated)
These tests verify that each adapter correctly implements its Port against real infrastructure. They are slower but targeted.
# tests/integration/test_postgres_storage.py
import pytest
import os
from core.domain import Document
from adapters.driven.postgres_storage import PostgresDocumentStorage
@pytest.fixture
def storage():
dsn = os.environ.get("TEST_DATABASE_URL", "postgresql://test:test@localhost/test_docs")
s = PostgresDocumentStorage(dsn=dsn)
yield s
# Cleanup: truncate test data
import psycopg2
with psycopg2.connect(dsn) as conn, conn.cursor() as cur:
cur.execute("TRUNCATE documents")
def test_save_and_retrieve(storage: PostgresDocumentStorage):
from datetime import datetime, timezone
doc = Document(
document_id="test-123",
filename="test.pdf",
user_id="user-1",
size_bytes=1024,
)
storage.save(doc)
retrieved = storage.get("test-123")
assert retrieved is not None
assert retrieved.filename == "test.pdf"
assert retrieved.user_id == "user-1"
def test_upsert_updates_status(storage: PostgresDocumentStorage):
from core.domain import ClassificationResult
doc = Document(document_id="test-456", filename="x.pdf", user_id="u1", size_bytes=100)
storage.save(doc)
doc.mark_processed(ClassificationResult("invoice", 0.9), "u1/test-456/x.pdf")
storage.save(doc)
retrieved = storage.get("test-456")
assert retrieved.status == "processed"
assert retrieved.category == "invoice"
def test_find_by_user(storage: PostgresDocumentStorage):
for i in range(3):
doc = Document(document_id=f"u2-doc-{i}", filename=f"f{i}.pdf",
user_id="user-2", size_bytes=100)
storage.save(doc)
# Different user
storage.save(Document(document_id="u3-doc", filename="other.pdf",
user_id="user-3", size_bytes=50))
docs = storage.find_by_user("user-2")
assert len(docs) == 3
assert all(d.user_id == "user-2" for d in docs)
# tests/integration/test_s3_archive.py
import pytest
import os
from adapters.driven.s3_archive import S3FileArchive
@pytest.fixture
def archive():
# Use localstack in CI: AWS_ENDPOINT_URL=http://localhost:4566
return S3FileArchive(bucket=os.environ.get("TEST_S3_BUCKET", "test-docs"))
def test_upload_and_download(archive: S3FileArchive):
content = b"Test PDF content for integration testing"
key = "integration-test/sample.pdf"
archive.upload(key, content, "application/pdf")
downloaded = archive.download(key)
assert downloaded == content
def test_download_missing_key_raises(archive: S3FileArchive):
with pytest.raises(FileNotFoundError):
archive.download("does/not/exist.pdf")
Tier 3 - End-to-End Tests (Real App, All Adapters)
# tests/e2e/test_document_api.py
import pytest
import os
from fastapi.testclient import TestClient
@pytest.fixture(scope="session")
def client():
# Requires all real services: Postgres, S3, Classifier
os.environ.setdefault("DATABASE_URL", "postgresql://app:app@localhost/e2e_docs")
os.environ.setdefault("S3_BUCKET", "e2e-docs")
os.environ.setdefault("CLASSIFIER_URL", "http://localhost:8001")
from main import app
return TestClient(app)
def test_upload_pdf_returns_201(client: TestClient):
pdf_content = b"%PDF-1.4 1 0 obj << /Type /Catalog >> endobj"
response = client.post(
"/documents/?user_id=e2e-user",
files={"file": ("test.pdf", pdf_content, "application/pdf")},
)
assert response.status_code == 201
data = response.json()
assert "document_id" in data
assert data["status"] == "processed"
assert data["category"] != ""
def test_get_own_document(client: TestClient):
pdf_content = b"%PDF-1.4 simple"
upload = client.post(
"/documents/?user_id=e2e-user2",
files={"file": ("doc.pdf", pdf_content, "application/pdf")},
)
doc_id = upload.json()["document_id"]
response = client.get(f"/documents/{doc_id}?user_id=e2e-user2")
assert response.status_code == 200
assert response.json()["document_id"] == doc_id
def test_cannot_access_other_users_document(client: TestClient):
pdf_content = b"%PDF private"
upload = client.post(
"/documents/?user_id=owner",
files={"file": ("private.pdf", pdf_content, "application/pdf")},
)
doc_id = upload.json()["document_id"]
response = client.get(f"/documents/{doc_id}?user_id=thief")
assert response.status_code == 403
Testing Pyramid Summary
| Tier | Tests | Speed | Dependencies | Count |
|---|---|---|---|---|
| Unit (core) | Business rules, validations, domain logic | Microseconds | None (all fakes) | 80% of tests |
| Integration (adapters) | Each adapter against real infrastructure | Seconds | Real DB, real S3, etc. | 15% of tests |
| E2E (full stack) | User journeys through real API | Minutes | All services running | 5% of tests |
Project Directory Layout
document-processing-service/
│
├── core/ # The hexagon - zero non-stdlib imports
│ ├── __init__.py
│ ├── domain.py # Document, ClassificationResult
│ ├── ports.py # DocumentStorage, FileArchive, TextExtractor, DocumentClassifier
│ └── use_cases.py # ProcessDocumentUseCase, GetDocumentUseCase
│
├── adapters/
│ ├── driving/ # Inbound adapters - call INTO core
│ │ ├── __init__.py
│ │ ├── fastapi_router.py # HTTP routes
│ │ ├── dependencies.py # FastAPI DI wiring
│ │ └── cli.py # CLI adapter
│ │
│ └── driven/ # Outbound adapters - called BY core
│ ├── __init__.py
│ ├── postgres_storage.py # PostgresDocumentStorage
│ ├── s3_archive.py # S3FileArchive
│ ├── local_storage.py # LocalFileStorage (dev)
│ ├── pdfminer_extractor.py # PdfMinerTextExtractor
│ └── http_classifier.py # HttpDocumentClassifier + LocalMLClassifier
│
├── tests/
│ ├── fakes.py # InMemory* and Stub* - satisfy all ports
│ ├── unit/
│ │ └── test_process_document.py
│ ├── integration/
│ │ ├── test_postgres_storage.py
│ │ └── test_s3_archive.py
│ └── e2e/
│ └── test_document_api.py
│
├── main.py # Composition root - production wiring
├── main_test.py # Composition root - test wiring with fakes
├── config.py # Profile-based adapter selection
├── pyproject.toml
└── docker-compose.yml # Postgres + LocalStack (S3) + classifier service
Import Direction Rule
The single rule that keeps hexagonal architecture intact:
core/ → NEVER imports from adapters/ or main.py
adapters/ → imports from core/ (ports and domain types) only
main.py → imports from both core/ and adapters/ (this is the only place allowed to)
tests/ → imports from core/ and tests/fakes.py only (unit tests)
If you ever see from adapters.driven.postgres_storage import ... inside core/, the architecture is violated. A linter rule or import checker can enforce this automatically.
# pyproject.toml - enforce import boundaries with import-linter
# [tool.importlinter]
# root_packages = ["core", "adapters"]
#
# [[tool.importlinter.contracts]]
# name = "Core is independent"
# type = "forbidden"
# source_modules = ["core"]
# forbidden_modules = ["adapters", "fastapi", "sqlalchemy", "boto3", "requests"]
Interview Patterns
Pattern 1 - "What is the difference between hexagonal architecture and MVC?"
Strong answer: "MVC is a presentation pattern - it organises the web layer into Model, View, Controller. Hexagonal Architecture is an application architecture pattern that goes further: it says your entire business logic should live in a framework-free core, and every I/O system - database, HTTP client, queue - should be behind an interchangeable adapter. MVC does not prevent your controller from importing SQLAlchemy. Hexagonal Architecture enforces that your core imports nothing outside the standard library."
Pattern 2 - "How do you test a service that calls S3 and PostgreSQL?"
Strong answer: "I would not write the service so that it calls S3 and PostgreSQL directly. I define FileArchive and DocumentStorage as Python Protocols in the core. In tests I inject InMemoryFileArchive and InMemoryDocumentStorage. The unit tests run in microseconds with no external dependencies. I separately write adapter integration tests that verify S3FileArchive correctly implements FileArchive against a real bucket - or LocalStack in CI. The business rules and the adapter correctness are verified independently."
Pattern 3 - "Where would you add rate limiting in hexagonal architecture?"
Strong answer: "Rate limiting is a cross-cutting concern. In hexagonal architecture I would add it as a driving adapter - a middleware layer that sits between the HTTP framework and the use case. It does not belong in the domain core because rate limiting is an operational policy, not a business rule. It could also be a decorator around the use case callable, which keeps it composable and independently testable."
Pattern 4 - "What is a Port?"
Strong answer: "A Port is a Python Protocol or ABC that defines the interface between the core and the outside world. Ports have two varieties: driving ports define how external actors call into the core (though in practice these are usually just the use case class itself); driven ports define how the core calls outward to infrastructure - storage, external APIs, messaging. Both are abstractions that live in the core. The concrete implementations - PostgreSQL, S3, SMTP - are adapters that live outside the core and implement the port interface."
Pattern 5 - "Design a system where you can swap from PostgreSQL to DynamoDB without changing business logic"
Strong answer:
- Define a
UserRepositoryProtocol in the core withget(user_id),save(user),find_by_email(email)methods - Implement
PostgresUserRepositoryinadapters/driven/ - Implement
DynamoDBUserRepositoryinadapters/driven/ - Both implement the same Protocol
- The composition root (
main.py) decides which to inject based on an environment variable - The entire core - all business rules, all use cases - is untouched
- Unit tests use
InMemoryUserRepository; adapter tests verify each implementation independently
This is the core value proposition of the Ports and Adapters pattern: the decision of which storage technology to use is made exactly once, in the composition root, and changed in exactly one place.
